package com.mininglamp.kafkastream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * Project: ECommerceRecommendSystem
 * Package: com.mininglamp.kafkastream
 * Description
 * <p>
 * Created by ZhouPeng on 2022/01/07 18:05
 **/
public class LogProcessor implements Processor<byte[],byte[]> {

    private ProcessorContext processorContext;

    @Override
    public void init(ProcessorContext context) {
        this.processorContext = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {

        //核心处理流程
        String input = new String(value);
        //提取日志，以固定前缀过滤日志信息
        if(input.contains("PRODUCT_RATING_PREFIX:")){
            System.out.println("product rating data coming!" + input);
            input = input.split("PRODUCT_RATING_PREFIX:")[1].trim();
            processorContext.forward("logProcessor".getBytes(),input.getBytes());
        }

    }

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}
